[SPARK-56395][CONNECT][PYTHON] Add NEAREST BY DataFrame API#55682
[SPARK-56395][CONNECT][PYTHON] Add NEAREST BY DataFrame API#55682dilipbiswal wants to merge 7 commits into
Conversation
| session=self._session, | ||
| ) | ||
|
|
||
| def nearestByJoin( |
There was a problem hiding this comment.
we need Spark connect tests for nearestByJoin - see lateralJoin tests in DataFrameSubquerySuite and PlanGenerationTestSuite
| } | ||
|
|
||
| private[sql] object Dataset { | ||
| // Acceptance lists for `nearestByJoin`. Must stay aligned with `NearestByJoinType` / |
There was a problem hiding this comment.
how do we keep these in sync? is there a good way to share the same validation list? like move the lists to sql/api so that both sql/connect and sql/catalyst could reuse it.
| ... [("A", 11.0), ("B", 22.0), ("C", 5.0)], ["product", "pscore"]) | ||
| >>> users.nearestByJoin( | ||
| ... products, -sf.abs(users.score - products.pscore), 1, "exact", "similarity" | ||
| ... ).select("user_id", "product").orderBy("user_id").show() |
There was a problem hiding this comment.
the doctest will only cover the hapy path with default inner join - can we add more tests similar to
| // cannot import. | ||
| private val MaxNumResults: Int = 100000 | ||
| private val SupportedJoinTypeDisplay = "'INNER', 'LEFT OUTER'" | ||
| private val SupportedJoinTypes = Set("inner", "leftouter", "left", "left_outer") |
There was a problem hiding this comment.
why do we need both leftouter and left_outer?
| private val SupportedModes = Seq("approx", "exact") | ||
| private val SupportedDirections = Seq("distance", "similarity") | ||
|
|
||
| private[connect] def validateNearestByJoinArgs( |
There was a problem hiding this comment.
look like we are missing similar validations in python client?
zhidongqu-db
left a comment
There was a problem hiding this comment.
I would closely examine implementation and existing tests for lateral join and try to mirror that here
275687d to
17e65ad
Compare
zhidongqu-db
left a comment
There was a problem hiding this comment.
overall lgtm, will tag someone who is more familiar with spark connect to take a look as well.
| private[sql] object NearestByJoinValidation { | ||
|
|
||
| /** Upper bound on `numResults`. Mirrors the K-overload limit of `MaxMinByK`. */ | ||
| val MaxNumResults: Int = 100000 |
There was a problem hiding this comment.
can this and
share the same constant? sql/catalyst depends on sql-api
cloud-fan
left a comment
There was a problem hiding this comment.
This PR rounds out the DataFrame surface for NEAREST BY introduced in SPARK-56395.
Prior state and problem. Catalyst NearestByJoin plan node, RewriteNearestByJoin rewrite, error classes, and the SQL parser path (APPROX|EXACT NEAREST K BY DISTANCE|SIMILARITY <expr>) were already in master. There was no programmatic entry point — only SQL. PySpark and Spark Connect had no surface at all.
Design approach. Add nearestByJoin(...) overloads on the Dataset trait (sql-api) so all editions inherit it; implement once each in classic (sql/core) and connect (sql/connect/common). For Connect, mirror the AsOfJoin proto pattern (string-typed join_type/mode/direction, server-side validation in SparkConnectPlanner). Pre-validate client-side in both Scala connect (Dataset.validateNearestByJoinArgs) and Python connect (NearestByJoin.__init__) so users see the same NEAREST_BY_JOIN.* error classes without a server round-trip.
Key design decisions.
- Acceptance lists factored into
NearestByJoinValidation(sql-api). Bothsql/connect/common(which can't depend on sql/catalyst) andsql/catalyst(whereNearestByJoinType.applyetc. live) reference the same constants. The catalystapplymethods now hold thin re-exports ofsupported/supportedDisplayfor source compatibility. - Server-side translator routes through the DataFrame API, not by constructing
logical.NearestByJoindirectly — same astransformAsOfJoin. Means proto handling never goes "behind" the public validator. - Strings (not enums) for joinType/mode/direction over the wire, explicitly cited as parity with AsOfJoin.
Implementation sketch.
sql-api Datasetdeclares 2 abstract overloads (5-arg + 6-arg with explicitjoinType); Mima exclude added.- Classic
Dataset.nearestByJoinchecksnumResultsrange, callsNearestByJoinType/Mode/Direction.applyto coerce strings (these throw the analysis-time error classes), then constructsNearestByJoinlogical plan. - Connect
Dataset.nearestByJoinImplcallsDataset.validateNearestByJoinArgs(same checks as the catalystapplymethods) and emitsNearestByJoinproto. SparkConnectPlanner.transformNearestByJoinchecks the proto string fields are non-empty, then callsleft.nearestByJoin(...)on the server-side classic Dataset.- Python
connect/plan.pyNearestByJoinopen-codes the same validation (constants documented as needing to stay in sync withNearestByJoinValidation). - Test coverage spans both editions via a shared mixin (classic via
ReusedSQLTestCase, Connect viaReusedConnectTestCase), plus a Connect-jvm end-to-end suite, twoPlanGenerationTestSuiteentries with stable explain artifacts, and a sql/core suite covering the rewrite-internal-column hygiene and all error paths.
A few small doc / test-comment polish items below — none of them block.
| // server round-trip. Acceptance lists must stay aligned with `NearestByJoinType` / | ||
| // `NearestByJoinMode` / `NearestByDirection` in sql/catalyst, which `sql/connect/common` | ||
| // cannot import. |
There was a problem hiding this comment.
After extracting NearestByJoinValidation, both sides reference the same source — there are no separate lists left to "stay aligned" with. The bit worth keeping is that the validation logic here mirrors what NearestByJoinType.apply etc. do server-side, since sql/connect/common can't import sql/catalyst.
| // server round-trip. Acceptance lists must stay aligned with `NearestByJoinType` / | |
| // `NearestByJoinMode` / `NearestByDirection` in sql/catalyst, which `sql/connect/common` | |
| // cannot import. | |
| // server round-trip. The validation logic mirrors `NearestByJoinType.apply` / | |
| // `NearestByJoinMode.apply` / `NearestByDirection.apply` in sql/catalyst, which | |
| // `sql/connect/common` cannot import; the acceptance lists themselves are shared via | |
| // `NearestByJoinValidation` in sql-api. |
| // For each row on the left side, returns up to `num_results` rows from the right side ordered | ||
| // by `ranking_expression`. |
There was a problem hiding this comment.
Top-K, not a sort — output rows aren't guaranteed to be ordered. The Scaladoc and Python docstring already say "ranked by".
| // For each row on the left side, returns up to `num_results` rows from the right side ordered | |
| // by `ranking_expression`. | |
| // For each row on the left side, returns up to `num_results` rows from the right side ranked | |
| // by `ranking_expression`. |
| ``exact`` forces brute-force evaluation and requires the ranking expression to be | ||
| deterministic. | ||
| direction : str | ||
| ``"distance"`` (smallest values first) or ``"similarity"`` (largest values first). |
There was a problem hiding this comment.
Python uses the plural form here, but the Scala 5-arg and 6-arg overloads at sql/api/.../Dataset.scala:941,971 use singular ("smallest value first" / "largest value first"). Either is fine; consistency matters more.
| ``"distance"`` (smallest values first) or ``"similarity"`` (largest values first). | |
| ``"distance"`` (smallest value first) or ``"similarity"`` (largest value first). |
|
|
||
| test("self-join: each row finds nearest other rows in the same DataFrame") { | ||
| val (users, _) = prepareForNearestByJoin() | ||
| // For each user, find the 1 other user with the closest score (excluding self by ranking). |
There was a problem hiding this comment.
The comment says "find the 1 other user with the closest score (excluding self by ranking)", but the ranking expression -abs(users("score") - users("score")) resolves to left.score - left.score (both column references were captured from the same Dataset before DeduplicateRelations re-IDs the right side), so the rank is identically 0 for every candidate — top-K just returns any 2 of the 3 right rows; there is no self-exclusion. The test still does what it needs to (verifies the self-join resolves at all), but the comment overstates what's verified.
| // For each user, find the 1 other user with the closest score (excluding self by ranking). | |
| // We pass `users` as both sides; DeduplicateRelations rewrites the right side to | |
| // generate fresh ExprIds, so the join resolves. Both `users("score")` references in | |
| // the ranking expression bind to the original (left) attribute, so the rank is | |
| // identically 0 for every candidate -- this test exercises self-join resolution, | |
| // not nearest-row selection. |
|
@zhidongqu-db @cloud-fan - kindly double check when you get a chance please. |
cloud-fan
left a comment
There was a problem hiding this comment.
4 addressed, 0 remaining, 1 new (1 newly introduced) — all four prior findings (relations.proto "ordered"→"ranked", dataframe.py singular value first, connect/Dataset.scala shared-list clarification, and the misleading self-join test comment) are resolved. The self-join comment edit left a duplicate block; minor inline suggestion below.
| // We pass `users` as both sides; DeduplicateRelations rewrites the right side to | ||
| // generate fresh ExprIds, so the join resolves. Both `users("score")` references in | ||
| // the ranking expression bind to the original (left) attribute, so the rank is | ||
| // identically 0 for every candidate -- this test exercises self-join resolution, | ||
| // not nearest-row selection. | ||
| // We pass `users` as both sides; DeduplicateRelations should rewrite the right side to | ||
| // generate fresh ExprIds, allowing the join to resolve. |
There was a problem hiding this comment.
The replacement for the old misleading comment was added at lines 107-111, but the prior block at 112-113 ("DeduplicateRelations should rewrite ... allowing the join to resolve") was not deleted, so the same point now appears twice. Suggest collapsing to just the new block:
| // We pass `users` as both sides; DeduplicateRelations rewrites the right side to | |
| // generate fresh ExprIds, so the join resolves. Both `users("score")` references in | |
| // the ranking expression bind to the original (left) attribute, so the rank is | |
| // identically 0 for every candidate -- this test exercises self-join resolution, | |
| // not nearest-row selection. | |
| // We pass `users` as both sides; DeduplicateRelations should rewrite the right side to | |
| // generate fresh ExprIds, allowing the join to resolve. | |
| // We pass `users` as both sides; DeduplicateRelations rewrites the right side to | |
| // generate fresh ExprIds, so the join resolves. Both `users("score")` references in | |
| // the ranking expression bind to the original (left) attribute, so the rank is | |
| // identically 0 for every candidate -- this test exercises self-join resolution, | |
| // not nearest-row selection. |
Builds on SPARK-56395 (catalyst-side, prior PR). Adds the DataFrame `nearestByJoin` method in Scala / Java / PySpark, the corresponding Spark Connect proto and server/client wiring, and the end-to-end DataFrame test suite.
…on tests, doc alignment
c4766da to
95bd1a4
Compare
|
@cloud-fan - kindly help merge this when you get time. |
|
thanks, merging to master/4.x/4.2 (feature parity between SQL and dataframe)! |
### What changes were proposed in this pull request? Builds on the catalyst-side merged in SPARK-56395 [(link).](#55629) Adds the DataFrame `nearestByJoin` method in Scala / Java / PySpark and wires up Spark Connect: ### Why are the changes needed API completeness. The prior PR exposed `NEAREST BY` only via SQL; this PR brings the same capability to DataFrame / PySpark / Spark Connect. ### Does this PR introduce _any_ user-facing change? // Scala ``` users.nearestByJoin( products, -abs(users("score") - products("pscore")), numResults = 1, mode = "exact", direction = "similarity", joinType = "leftouter") ``` // PySpark ``` users.nearestByJoin( products, -sf.abs(users.score - products.pscore), 1, "exact", "similarity", joinType="leftouter", ).select("user_id", "product").show() ``` ### How was this patch tested? DataFrameNearestByJoinSuite,RewriteNearestByJoinSuite, python doctests ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.7), human-reviewed and tested Closes #55682 from dilipbiswal/SPARK-56395-DF-CONNECT2. Authored-by: Dilip Biswal <dkbiswal@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 13380e7) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Builds on the catalyst-side merged in SPARK-56395 [(link).](#55629) Adds the DataFrame `nearestByJoin` method in Scala / Java / PySpark and wires up Spark Connect: API completeness. The prior PR exposed `NEAREST BY` only via SQL; this PR brings the same capability to DataFrame / PySpark / Spark Connect. // Scala ``` users.nearestByJoin( products, -abs(users("score") - products("pscore")), numResults = 1, mode = "exact", direction = "similarity", joinType = "leftouter") ``` // PySpark ``` users.nearestByJoin( products, -sf.abs(users.score - products.pscore), 1, "exact", "similarity", joinType="leftouter", ).select("user_id", "product").show() ``` DataFrameNearestByJoinSuite,RewriteNearestByJoinSuite, python doctests Generated-by: Claude Code (Opus 4.7), human-reviewed and tested Closes #55682 from dilipbiswal/SPARK-56395-DF-CONNECT2. Authored-by: Dilip Biswal <dkbiswal@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 13380e7) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
| ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.DataStreamReader.name"), | ||
| // [SPARK-34591][ML] Add pruneTree parameter to Strategy | ||
| ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.tree.configuration.Strategy.this") | ||
| ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.tree.configuration.Strategy.this"), |
There was a problem hiding this comment.
branch-4.2 ci failed:
[info] Building Spark using SBT with these arguments: -Phadoop-3 -Pkubernetes -Pjvm-profiler -Pkinesis-asl -Phadoop-cloud -Phive-thriftserver -Pyarn -Pdocker-integration-tests -Phive -Pspark-ganglia-lgpl -Pvolcano Test/package streaming-kinesis-asl-assembly/assembly connect/assembly
sbt build spark
Using /opt/hostedtoolcache/Java_Zulu_jdk/17.0.19-10/x64 as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
Using SPARK_LOCAL_IP=localhost
[info] welcome to sbt 1.12.8 (Azul Systems, Inc. Java 17.0.19)
[info] loading settings for project spark-build from plugins.sbt...
[info] loading project definition from /home/runner/work/spark/spark/project
[info] compiling 3 Scala sources to /home/runner/work/spark/spark/project/target/scala-2.12/sbt-1.0/classes ...
[error] /home/runner/work/spark/spark/project/MimaExcludes.scala:62:19: ')' expected but '.' found.
[error] ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.Dataset.nearestByJoin")
[error] ^
[error] /home/runner/work/spark/spark/project/MimaExcludes.scala:63:3: ';' expected but ')' found.
[error] )
[error] ^
[error] two errors found
[error] (Compile / compileIncremental) Compilation failed
[warn] Project loading failed: (r)etry, (q)uit, (l)ast, or (i)gnore? (default: r)
Invalid response: '"'
[warn] Project loading failed: (r)etry, (q)uit, (l)ast, or (i)gnore? (default: r)
Error: running /home/runner/work/spark/spark/build/sbt -Phadoop-3 -Pkubernetes -Pjvm-profiler -Pkinesis-asl -Phadoop-cloud -Phive-thriftserver -Pyarn -Pdocker-integration-tests -Phive -Pspark-ganglia-lgpl -Pvolcano Test/package streaming-kinesis-asl-assembly/assembly connect/assembly ; received return code 1
Error: Process completed with exit code 16.
Could you please fix this? @dilipbiswal
There was a problem hiding this comment.
@cloud-fan very sorry Wenchen. I missed this ping. Thanks a lot for taking care of this.
…nch-4.2 ### What changes were proposed in this pull request? Add a missing trailing comma to the `DataStreamReader.name` MiMa exclude entry in `project/MimaExcludes.scala` on branch-4.2. The cherry-pick of SPARK-56395 (commit 2c356ac) added a new `Dataset.nearestByJoin` exclude entry to `v42excludes` but did not append the required comma to the preceding `DataStreamReader.name` entry. ### Why are the changes needed? Without the comma, SBT project loading fails on branch-4.2: ``` [error] /home/runner/work/spark/spark/project/MimaExcludes.scala:62:19: ')' expected but '.' found. [error] ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.Dataset.nearestByJoin") [error] ^ [error] /home/runner/work/spark/spark/project/MimaExcludes.scala:63:3: ';' expected but ')' found. [error] ) [error] ^ [error] two errors found [error] (Compile / compileIncremental) Compilation failed ``` This breaks all CI builds on branch-4.2. Reported by LuciferYang in #55682 (comment). This change is branch-4.2 only; master is unaffected because the `nearestByJoin` entry has not been merged to master yet. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing CI. The fix restores compilability of the SBT project definition. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.7) Closes #55873 from cloud-fan/cloud-fan/SPARK-56395-branch-4.2-fix. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add a fail-hard check in `dev/merge_spark_pr.py`: after fetching the JIRA summary for each `SPARK-NNNN` referenced in the PR title, abort the merge if any linked issue type is `Epic` or `Umbrella`. All offenders are collected in one pass and reported together so the committer can fix every blocker before re-running.
Sample failure message:
Cannot merge PR #56137. Linked JIRA(s) SPARK-1234 (Umbrella), SPARK-5678 (Epic) are Umbrella or Epic tickets and MUST not be resolved by a single PR. File Sub-task(s) under SPARK-1234 (Umbrella), SPARK-5678 (Epic) and update the PR title to reference the Sub-task(s) instead.
### Why are the changes needed?
When the merge script completes a merge, it transitions the linked JIRA(s) to **Resolved** and stamps a **Fix Version** on them.
PRs frequently end up referencing the wrong ticket - it is common for a contributor to drop the parent Epic/Umbrella ID into the title when they meant to create or reference a Sub-task. When that slips past the committer, the script silently auto-closes the Epic/Umbrella with a Fix Version, leaving the parent ticket in an incorrect **Resolved** state with a Fix Version it should not have. Cleaning that up later (reopening the Epic, removing the stray Fix Version, retransitioning) is manual and easy to forget.
Recent examples (all Umbrellas auto-resolved by the merge script):
- [SPARK-54137](https://issues.apache.org/jira/browse/SPARK-54137) "Prepare Apache Spark 4.2.0" - the release-prep umbrella. PR #53445 was titled `[SPARK-54137][SQL][CONNECT] Remove redundant observed-metrics responses` at merge time. The script stamped the umbrella **Resolved / Fix Version 4.2.0** on 2025-12-22 - within 13 seconds of the merge commit. A maintainer had to hand-reopen the umbrella and clear the Fix Version on 2026-01-06; the PR was retitled to its proper Sub-task SPARK-54685 after the fact.
- [SPARK-54119](https://issues.apache.org/jira/browse/SPARK-54119) "Metrics & semantic modeling in Spark" - Umbrella stuck at **Resolved / Fix Version 4.2.0** after 4 linked PRs: #55449, #55487, #55983, #56010.
- [SPARK-56395](https://issues.apache.org/jira/browse/SPARK-56395) "SPIP: NEAREST BY Top-K Ranking Join" - Umbrella stuck at **Resolved / Fix Version 4.2.0** after 5 linked PRs: #55629, #55681, #55682, #55688, #55873.
Failing the merge before the JIRA transition runs forces a quick title fix (or a Sub-task creation) at merge time, and keeps Epic/Umbrella status accurate.
### Does this PR introduce _any_ user-facing change?
No. Committer tooling only.
### How was this patch tested?
- `python3 -m py_compile dev/merge_spark_pr.py` succeeds.
- Existing doctests still pass: `python3 -m doctest dev/merge_spark_pr.py` - 57/57.
- The new check only runs when `asf_jira` is initialized, leaving the no-JIRA path unchanged, and the per-ID fetch-error path is preserved.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7)
Closes #56137 from zhengruifeng/merge-script-block-umbrella-epic-dev3.
Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@foxmail.com>
… LEFT OUTER Followup to apache#55682. In RewriteNearestByJoin, when the NEAREST BY join type is LEFT OUTER, the synthesized Join widens the right-side columns to nullable. However, the synthesized Aggregate (and the optional __ranking__ Project) built on top of that join still referenced the right-side columns via right.output and rankingExpression with their original (non-nullable) nullability. As a result the rewritten plan can declare a right-side column as non-nullable while its child -- the join -- produces it as nullable. This commit maps the right-side attributes to their widened (nullable) form for LEFT OUTER and rewrites both the CreateStruct(right.*) and the ranking expression to use that widened nullability, so the rewritten plan's schema is consistent with its child. For INNER joins the right side is not widened, so this is a no-op.
… LEFT OUTER ### What changes were proposed in this pull request? Followup to #55682. In `RewriteNearestByJoin`, when the `NEAREST BY` join type is `LEFT OUTER`, the synthesized `Join` widens the right-side columns to nullable. However, the synthesized `Aggregate` (and the optional `__ranking__` `Project`) built on top of that join still referenced the right-side columns via `right.output` and `rankingExpression` with their original (non-nullable) nullability. As a result the rewritten plan can declare a right-side column as non-nullable while its child -- the join -- produces it as nullable. This PR maps the right-side attributes to their widened (nullable) form for `LEFT OUTER` and rewrites both the `CreateStruct(right.*)` and the ranking expression to use that widened nullability, so the rewritten plan's schema is consistent with its child. For `INNER` joins the right side is not widened, so this is a no-op. ### Why are the changes needed? Without this fix the rewritten plan for a `LEFT OUTER NEAREST BY` declares right-side columns non-nullable while its join child produces them nullable -- an inconsistency that nullability/plan-integrity validation flags as a regression. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added a regression test to `RewriteNearestByJoinSuite` that, for both `INNER` and `LEFT OUTER`, asserts every right-side attribute the synthesized `Aggregate` references agrees on nullability with its join child. The test uses **non-nullable** right-side columns so that `LEFT OUTER`'s widening is observable -- it fails without this fix (`x#.. declared nullable=false but its child produces nullable=true`) and passes with it, while `INNER` stays a no-op. The suite's expected-plan helper was also updated to mirror the widened nullability. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #56484 from cloud-fan/SPARK-56395-followup. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… LEFT OUTER ### What changes were proposed in this pull request? Followup to #55682. In `RewriteNearestByJoin`, when the `NEAREST BY` join type is `LEFT OUTER`, the synthesized `Join` widens the right-side columns to nullable. However, the synthesized `Aggregate` (and the optional `__ranking__` `Project`) built on top of that join still referenced the right-side columns via `right.output` and `rankingExpression` with their original (non-nullable) nullability. As a result the rewritten plan can declare a right-side column as non-nullable while its child -- the join -- produces it as nullable. This PR maps the right-side attributes to their widened (nullable) form for `LEFT OUTER` and rewrites both the `CreateStruct(right.*)` and the ranking expression to use that widened nullability, so the rewritten plan's schema is consistent with its child. For `INNER` joins the right side is not widened, so this is a no-op. ### Why are the changes needed? Without this fix the rewritten plan for a `LEFT OUTER NEAREST BY` declares right-side columns non-nullable while its join child produces them nullable -- an inconsistency that nullability/plan-integrity validation flags as a regression. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added a regression test to `RewriteNearestByJoinSuite` that, for both `INNER` and `LEFT OUTER`, asserts every right-side attribute the synthesized `Aggregate` references agrees on nullability with its join child. The test uses **non-nullable** right-side columns so that `LEFT OUTER`'s widening is observable -- it fails without this fix (`x#.. declared nullable=false but its child produces nullable=true`) and passes with it, while `INNER` stays a no-op. The suite's expected-plan helper was also updated to mirror the widened nullability. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #56484 from cloud-fan/SPARK-56395-followup. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 62e4e16) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… LEFT OUTER ### What changes were proposed in this pull request? Followup to #55682. In `RewriteNearestByJoin`, when the `NEAREST BY` join type is `LEFT OUTER`, the synthesized `Join` widens the right-side columns to nullable. However, the synthesized `Aggregate` (and the optional `__ranking__` `Project`) built on top of that join still referenced the right-side columns via `right.output` and `rankingExpression` with their original (non-nullable) nullability. As a result the rewritten plan can declare a right-side column as non-nullable while its child -- the join -- produces it as nullable. This PR maps the right-side attributes to their widened (nullable) form for `LEFT OUTER` and rewrites both the `CreateStruct(right.*)` and the ranking expression to use that widened nullability, so the rewritten plan's schema is consistent with its child. For `INNER` joins the right side is not widened, so this is a no-op. ### Why are the changes needed? Without this fix the rewritten plan for a `LEFT OUTER NEAREST BY` declares right-side columns non-nullable while its join child produces them nullable -- an inconsistency that nullability/plan-integrity validation flags as a regression. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added a regression test to `RewriteNearestByJoinSuite` that, for both `INNER` and `LEFT OUTER`, asserts every right-side attribute the synthesized `Aggregate` references agrees on nullability with its join child. The test uses **non-nullable** right-side columns so that `LEFT OUTER`'s widening is observable -- it fails without this fix (`x#.. declared nullable=false but its child produces nullable=true`) and passes with it, while `INNER` stays a no-op. The suite's expected-plan helper was also updated to mirror the widened nullability. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #56484 from cloud-fan/SPARK-56395-followup. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 62e4e16) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Builds on the catalyst-side merged in SPARK-56395 (link). Adds the DataFrame
nearestByJoinmethod in Scala / Java / PySpark and wires up Spark Connect:Why are the changes needed
API completeness. The prior PR exposed
NEAREST BYonly via SQL; this PR brings the same capability to DataFrame / PySpark / Spark Connect.Does this PR introduce any user-facing change?
// Scala
// PySpark
How was this patch tested?
DataFrameNearestByJoinSuite,RewriteNearestByJoinSuite, python doctests
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7), human-reviewed and tested